-
Notifications
You must be signed in to change notification settings - Fork 750
[GOBBLIN-2135] Cache Gobblin YARN application jars #4030
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…epeatedly upload jars and files to hdfs
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
Show resolved
Hide resolved
| LOG.info(String.format("Adding %s to classpath", destJarFile)); | ||
| DistributedCache.addFileToClassPath(destJarFile, conf, this.fs); | ||
| } else { | ||
| LOG.error("Failed to upload jar file: " + status.getPath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't find the prior code throwing an error...
nonetheless, should everything continue on w/ just some error logs?
shouldn't we instead fail the overall job because presumably necessary jars won't be there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly we should throw an error, but I think given that another job could be uploading the same jars though it might be better to let the job attempt to try and run, if that job fails it should be emitting the failed event anyways.
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
Show resolved
Hide resolved
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
Outdated
Show resolved
Hide resolved
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
Outdated
Show resolved
Hide resolved
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/HdfsJarUploadUtils.java
Outdated
Show resolved
Hide resolved
| * @return | ||
| * @throws IOException | ||
| */ | ||
| public static boolean uploadJarToHdfs(FileSystem fs, FileStatus localJar, int jarFileMaximumRetry, Path destJarFile) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jarFileMaximumRetry => simply maxAttempts?
| } | ||
| if (this.jarCacheEnabled) { | ||
| addContainerLocalResources(new Path(jarCacheDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap); | ||
| if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_LOCAL_KEY)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand this key. you're checking it here in two different conditionals, but in neither one do you actually use (or even check to see) what value it holds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops should have used gobblin.yarn.container.jars key
| this.appLauncherMode = ConfigUtils.getString(config, GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE); | ||
|
|
||
| this.appLauncherMode = ConfigUtils.getString(this.config, GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE); | ||
| this.jarCacheEnabled = ConfigUtils.getBoolean(config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NBD, but you just updated the two above to be this.config, but only use config here :)
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
Outdated
Show resolved
Hide resolved
| } | ||
| if (resourceMap.isPresent()) { | ||
| YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, LocalResourceType.FILE, resourceMap.get()); | ||
| LOGGER.warn("Failed to upload jar file {} to HDFS", srcFilePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, should this be an actual failure, not merely logging?
...or do we believe there are times when it's actually OK to continue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the concern is valid but currently trying to get parity with MR job launcher, I think since there could always be concurrent executions adding the jars instead so it can be worthwhile to just attempt the job, it will fail loudly if the jars weren't uploaded properly anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that's fine to start. how about a
// TODO: decide whether to fail-fast here, given the job may be unable to run w/o it
| if (this.jarCacheEnabled) { | ||
| Path jarCachePath = YarnHelixUtils.calculateJarCachePath(this.config); | ||
| // Retain at least the current and last month's jars to handle executions running for ~30 days max | ||
| boolean cleanedSuccessfully = YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this run before or after caching jars? e.g. do we save only two prior AND THEN potentially add one more or we've added any new one already prior to retention paring it down to two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It runs after caching the jars. But it uses a consistent YARN_APPLICATION_LAUNCHER_START_TIME_KEY in the job so no matter how many times we look at the cache path it's only creating one path at most, and that path would be the ones where the jars are being uploaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be great to add this info to a code comment here (in the caller, since it's the combination of the two being used, so less effective in either's javadoc). the crux is that retention of K=2 won't save us in cases where K=3 might exceed any FS quotas.
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
Outdated
Show resolved
Hide resolved
| if (jarDirs.size() > k) { | ||
| return fs.delete(jarDirs.get(0).getPath(), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is not a loop, it seems it would delete at most one dir even if there are more than 1 more than k. is that ok? if so, document in javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it to use a loop, for consistency with naming convention.
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #4030 +/- ##
============================================
+ Coverage 45.86% 55.31% +9.45%
+ Complexity 3257 1582 -1675
============================================
Files 707 307 -400
Lines 27865 10580 -17285
Branches 2796 1069 -1727
============================================
- Hits 12779 5852 -6927
+ Misses 14008 4223 -9785
+ Partials 1078 505 -573 ☔ View full report in Codecov by Sentry. |
23a9b21 to
88fdddc
Compare
| * @param localJarPath | ||
| * @param unsharedJarsDir | ||
| * @param jarCacheDir | ||
| * @return | ||
| * @throws IOException | ||
| */ | ||
| public static Path calculateDestJarFile(FileSystem fs, FileStatus localJar, Path unsharedJarsDir, Path jarCacheDir) throws IOException { | ||
| Path uploadDir = localJar.getPath().getName().contains("SNAPSHOT") ? unsharedJarsDir : jarCacheDir; | ||
| Path destJarFile = new Path(fs.makeQualified(uploadDir), localJar.getPath().getName()); | ||
| public static Path calculateDestJarFilePath(FileSystem fs, String localJarPath, Path unsharedJarsDir, Path jarCacheDir) throws IOException { | ||
| Path uploadDir = localJarPath.contains("SNAPSHOT") ? unsharedJarsDir : jarCacheDir; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seeing this invoked, localJarPath looks more like localJarBasename, not the (full) path.
(or simply jarName)
| return fs.delete(jarDirs.get(0).getPath(), true); | ||
| boolean deletesSuccessful = true; | ||
| for (int i = 0; i < jarDirs.size() - k; i++) { | ||
| deletesSuccessful = deletesSuccessful && fs.delete(jarDirs.get(i).getPath(), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
&=
| if (this.jarCacheEnabled) { | ||
| Path jarCachePath = YarnHelixUtils.calculateJarCachePath(this.config); | ||
| // Retain at least the current and last month's jars to handle executions running for ~30 days max | ||
| boolean cleanedSuccessfully = YarnHelixUtils.retainKLatestJarCachePaths(jarCachePath.getParent(), 2, this.fs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be great to add this info to a code comment here (in the caller, since it's the combination of the two being used, so less effective in either's javadoc). the crux is that retention of K=2 won't save us in cases where K=3 might exceed any FS quotas.
phet
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work william - I can't wait to get this in and streamline every startup!
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Gobblin YARN Application Launcher lacks some functionality used in MRJobLauncher. One of the biggest gaps in feature parity is the absence of jar caching, where MRJobLauncher creates a monthly cache that is automatically cleaned up by subsequent executions performed 2 months in advance.
YARN/MR requires uploading jars to HDFS, this step can be quite slow (~15 mins for a sizeable job to get all the jars), and given that many jobs do share the same jars, it makes sense to cache them together and only provide YARN the shared path.
We also want to ensure that SNAPSHOT jars are other files are not uploaded to a cache, since they are not immutable unlike jar versions on Artifactory.
This PR implements jar caching through 2 configurations:
Where if
gobblin.yarn.jar.cache.enabled=true, then it will look for the directory defined ingobblin.yarn.jar.cache.dir. It is expected that snapshot jars and other files are stored in some directory that is unique to the execution so that those jars will not be shared across other concurrent executions, only jars stored in the jar cache will be.Tests
It is tested that this saves approximately 10 minutes of bootstrap time per job.
Commits